package com.aries.dubbo.like.rpc.client.bootstrap;

import com.aries.dubbo.like.rpc.client.handler.RpcClientHandler;
import com.aries.dubbo.like.rpc.client.channel.cache.AddressChannelMap;
import com.aries.dubbo.like.rpc.common.codec.ByteBufConst;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;

import java.util.Hashtable;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

/**
 * Created with IntelliJ IDEA.
 * Author: aries
 * Date: 2018/8/23
 * Description: rpc client端启动类。主要是连接到远程服务。
 */
@Slf4j
public class ClientBootStrap {

    private static final List<Thread> THREAD_LIST = new LinkedList<>();
    private static final Hashtable<ChannelFuture, EventLoopGroup> CHANNEL_EVENT_LOOP_GROUP_MAP = new Hashtable<>(8, 2.0F);

    public static void connect(String host, int port, CountDownLatch countDownLatch, Thread thread) throws Exception {
        THREAD_LIST.add(thread);

        EventLoopGroup group = new NioEventLoopGroup(2);
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            try {
                                socketChannel
                                        .pipeline()
                                        .addLast(new DelimiterBasedFrameDecoder(10000,
                                                Unpooled.copiedBuffer(ByteBufConst.Delimiter())));
                                //写入分隔符，防止沾包
                                socketChannel
                                        .pipeline()
                                        .addLast(new RpcClientHandler());
                            } catch (Exception e) {
                                throw new RuntimeException("", e);
                            }
                        }
                    });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            //将host和channel放在map中缓存。（ServiceProxy会优先从map中根据host:port取出channel，如果没有，先连接再返回）
            AddressChannelMap.add(host + ":" + port, future.channel());
            countDownLatch.countDown();
            //将future和group放入map，方便资源关闭。
            CHANNEL_EVENT_LOOP_GROUP_MAP.put(future.channel().closeFuture(), group);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("error to connect remote server:" + host + ":" + port, e);
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void shutDown() {
        THREAD_LIST.forEach(Thread::interrupt);
        CHANNEL_EVENT_LOOP_GROUP_MAP.forEach((k, v) -> {
            k.channel().close();
            v.shutdownGracefully();
        });
    }
}
